Laravel 异步队列事件
下面是一个完整的Laravel示例,展示如何使用事件和Redis队列异步执行耗时任务:
1. 创建事件类
php artisan make:event OrderImEvent
app/Events/OrderImEvent.php
namespace App\Events;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class OrderImEvent
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public $immMsgType;
public $playersId;
public $userId;
public $actionRole;
public $order;
/**
* Create a new event instance.
*
* @return void
*/
public function __construct($immMsgType, $playersId, $userId, $actionRole, $order)
{
$this->immMsgType = $immMsgType;
$this->playersId = $playersId;
$this->userId = $userId;
$this->actionRole = $actionRole;
$this->order = $order;
}
}
2. 创建监听器类(实现ShouldQueue接口)
php artisan make:listener ProcessOrderImNotification --event=OrderImEvent
app/Listeners/ProcessOrderImNotification.php
<?php
namespace App\Listeners;
use App\Events\OrderImEvent;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
class ProcessOrderImNotification implements ShouldQueue
{
use InteractsWithQueue;
// 自定义队列连接
public $connection = 'redis';
// 自定义队列名称
public $queue = 'im_notifications';
/**
* 处理事件
*/
public function handle(OrderImEvent $event)
{
// 获取事件数据
$immMsgType = $event->immMsgType;
$playersId = $event->playersId;
$userId = $event->userId;
$actionRole = $event->actionRole;
$order = $event->order;
// 这里是你的耗时任务逻辑
$this->sendImNotification(
$immMsgType,
$playersId,
$userId,
$actionRole,
$order
);
}
/**
* 发送IM通知(模拟耗时任务)
*/
protected function sendImNotification($immMsgType, $playersId, $userId, $actionRole, $order)
{
// 实际业务逻辑 - 这可能是耗时操作
// 例如:调用第三方IM服务API、处理大数据等
// 模拟耗时操作(实际使用时删除)
sleep(120);
logger()->info('IM通知已发送', [
'msg_type' => $immMsgType,
'players' => $playersId,
'user_id' => $userId,
'role' => $actionRole,
'order_id' => $order->id
]);
}
/**
* 处理失败任务
*/
public function failed(OrderImEvent $event, $exception)
{
// 失败处理逻辑
logger()->error('IM通知发送失败: ' . $exception->getMessage(), [
'order_id' => $event->order->id,
'user_id' => $event->userId
]);
}
}
3. 注册事件监听器
app/Providers/EventServiceProvider.php
<?php
namespace App\Providers;
use App\Events\OrderImEvent;
use App\Listeners\ProcessOrderImNotification;
use Illuminate\Foundation\Support\Providers\EventServiceProvider as ServiceProvider;
class EventServiceProvider extends ServiceProvider
{
protected $listen = [
OrderImEvent::class => [
ProcessOrderImNotification::class,
],
];
// ...
}
4. 触发事件(在控制器中)
<?php
namespace App\Http\Controllers;
use App\Events\OrderImEvent;
use App\Models\Order;
use App\Enums\IMMsgType;
use App\Enums\ActionRole;
class OrderController extends Controller
{
public function placeOrder()
{
// 创建订单逻辑...
$order = Order::create([...]);
// 获取玩家ID列表
$playersId = [/* 玩家ID数组 */];
// 触发事件(异步执行)
event(new OrderImEvent(
IMMsgType::AfterPlaceOrder,
$playersId,
$order->user_id,
ActionRole::User,
$order
));
return response()->json(['success' => true]);
}
}
5. 配置Redis队列
.env 文件
QUEUE_CONNECTION=redis
REDIS_HOST=127.0.0.1
REDIS_PASSWORD=null
REDIS_PORT=6379
REDIS_QUEUE=default
6. 启动队列处理器
# 处理指定队列
php artisan queue:work redis --queue=im_notifications
# 或者在后台运行(使用Supervisor管理)
# 更多选项:--tries=3 --timeout=300
7. 监控队列(可选)
使用Laravel Horizon进行高级监控:
composer require laravel/horizon
# 发布配置文件
php artisan horizon:install
config/horizon.php
'environments' => [
'production' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['im_notifications'], // 监控我们的队列
'processes' => 5, // 进程数
'tries' => 3,
'timeout' => 300,
],
],
],
常用命令
horizon:clear : 删除指定队列中的所有任务。
horizon:clear-metrics : 删除所有任务和队列的指标数据。
horizon:continue : 指示主监控器(master supervisor)继续处理任务。
horizon:continue-supervisor : 指示指定的监控器继续处理任务。
horizon:forget : 删除一个失败的队列任务。
horizon:install : 安装所有 Horizon 资源。
horizon:list : 列出所有已部署的机器。
horizon:pause : 暂停主监控器。
horizon:pause-supervisor : 暂停一个指定的监控器。
horizon:publish : 发布所有 Horizon 资源。
horizon:purge : 终止所有不正常的 Horizon 进程。
horizon:snapshot : 存储队列指标的快照。
horizon:status : 获取 Horizon 的当前状态。
horizon:supervisor-status : 显示指定监控器的状态。
horizon:supervisors : 列出所有监控器。
horizon:terminate : 终止主监控器,以便可以重新启动它。
关键点说明:
-
异步处理:
- 事件触发后立即返回响应
- 耗时任务在队列中异步执行
- 不会阻塞主请求流程
-
队列配置:
- 使用Redis作为队列驱动
- 自定义队列名称
im_notifications - 支持失败任务重试和日志记录
-
优势:
- 解耦业务逻辑
- 提高系统响应速度
- 支持任务重试和错误处理
- 可水平扩展(增加更多队列worker)
-
处理长时间任务:
- 在队列worker中设置
--timeout参数 - 在
config/queue.php中配置重试次数:
- 在队列worker中设置
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 300, // 秒
'block_for' => null,
],